// Copyright (c) 2021-2024 SigScalr, Inc.
//
// This file is part of SigLens Observability Solution
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

package processor

import (
	"fmt"
	"io"

	"github.com/siglens/siglens/pkg/segment/query/iqr"
	"github.com/siglens/siglens/pkg/segment/results/segresults"
	"github.com/siglens/siglens/pkg/segment/structs"
	sutils "github.com/siglens/siglens/pkg/segment/utils"
	"github.com/siglens/siglens/pkg/utils"
)

type statisticExprProcessor struct {
	options                 *structs.StatisticExpr
	statsExpr               *structs.StatsExpr
	statsProcessor          *statsProcessor
	groupByColumns          []string
	qid                     uint64
	finalAggregationResults map[string]*structs.AggregationResult
	hasFinalResult          bool
	setAsIqrStatsResults    bool
}

func NewStatisticExprProcessor(options *structs.QueryAggregators) *statisticExprProcessor {
	processor := &statisticExprProcessor{
		options:   options.StatisticExpr,
		statsExpr: options.StatsExpr,
	}

	processor.statsProcessor = NewStatsProcessor(processor.statsExpr)

	return processor
}

func (p *statisticExprProcessor) SetAsIqrStatsResults() {
	p.statsProcessor.SetAsIqrStatsResults()
	p.setAsIqrStatsResults = true
}

func (p *statisticExprProcessor) Process(inputIQR *iqr.IQR) (*iqr.IQR, error) {
	if p.options == nil {
		return nil, utils.TeeErrorf("statisticExprProcessor.Process: options is nil")
	}

	// if the queryType is GroupByCMD, the AggregationResult is already set by the searcher
	if inputIQR != nil && inputIQR.GetStatsAggregationResult() != nil {
		p.qid = inputIQR.GetQID()
		return p.processAggregationResult(inputIQR.GetStatsAggregationResult())
	}

	if inputIQR != nil {
		p.qid = inputIQR.GetQID()
		return p.statsProcessor.Process(inputIQR)
	}

	// if iqr is nil, we are done processing the input

	if p.statsProcessor.searchResults == nil {
		return nil, io.EOF
	}

	if p.setAsIqrStatsResults {
		p.statsProcessor.searchResults.GetAggs().StatisticExpr = p.options
		iqr, err := p.statsProcessor.extractFinalStatsResults()
		if err != nil && err != io.EOF {
			return nil, fmt.Errorf("statisticExprProcessor.Process: Error extracting stats as IqrStatsResults: %v", err)
		}

		return iqr, err
	}

	aggResults := p.statsProcessor.searchResults.GetBucketResults()

	return p.processAggregationResult(aggResults)
}

func (p *statisticExprProcessor) Rewind() {
	p.statsProcessor.Rewind()
}

func (p *statisticExprProcessor) Cleanup() {
	p.statsProcessor.Cleanup()
}

func (p *statisticExprProcessor) GetFinalResultIfExists() (*iqr.IQR, bool) {
	if p.hasFinalResult {
		iqr, err := p.getIQRFromAggregationResults()
		if err != nil && err != io.EOF {
			return nil, false
		}

		return iqr, true
	}

	return nil, false
}

func (p *statisticExprProcessor) processAggregationResult(aggResults map[string]*structs.AggregationResult) (*iqr.IQR, error) {
	statsFields := p.options.GetFields()

	countIsGroupByCol := utils.SliceHas(statsFields, p.options.StatisticOptions.CountField)
	percentIsGroupByCol := utils.SliceHas(statsFields, p.options.StatisticOptions.PercentField)

	for _, aggregationResult := range aggResults {

		if len(aggregationResult.Results) == 0 {
			continue
		}
		resTotal := uint64(0)
		for _, bucketResult := range aggregationResult.Results {
			resTotal += (bucketResult.ElemCount)
		}
		//Sort results according to requirements
		err := p.options.SortBucketResult(&aggregationResult.Results)
		if err != nil {
			return nil, fmt.Errorf("statisticExprProcessor.Process: %v", err)
		}
		//Process bucket result
		otherCnt := resTotal
		for _, bucketResult := range aggregationResult.Results {

			countName := "count(*)"

			countIsStatisticGroupByCol := utils.SliceHas(statsFields, countName)
			//Delete count generated by the stats groupby block
			if !countIsStatisticGroupByCol {
				delete(bucketResult.StatRes, countName)
			}

			//Delete fields not in statistic expr
			err := p.options.RemoveFieldsNotInExprForBucketRes(bucketResult)
			if err != nil {
				return nil, fmt.Errorf("statisticExprProcessor.Process: %v", err)
			}

			otherCnt -= (bucketResult.ElemCount)

			// Set the appropriate column to the computed value
			if countIsGroupByCol || percentIsGroupByCol {
				err := p.options.OverrideGroupByCol(bucketResult, resTotal)
				if err != nil {
					return nil, fmt.Errorf("statisticExprProcessor.Process: %v", err)
				}
			}

			if p.options.StatisticOptions.ShowCount && !countIsGroupByCol {
				//Set Count to StatResult
				p.options.SetCountToStatRes(bucketResult.StatRes, bucketResult.ElemCount)
			}

			if p.options.StatisticOptions.ShowPerc && !percentIsGroupByCol {
				//Set Percent to StatResult
				p.options.SetPercToStatRes(bucketResult.StatRes, bucketResult.ElemCount, resTotal)
			}

			p.groupByColumns = bucketResult.GroupByKeys
		}

		//If useother=true, a row representing all other values is added to the results.
		if p.options.StatisticOptions.UseOther {
			statRes := make(map[string]sutils.CValueEnclosure)
			groupByKeys := aggregationResult.Results[0].GroupByKeys
			bucketKey := make([]interface{}, len(groupByKeys))
			otherEnclosure := sutils.CValueEnclosure{
				Dtype: sutils.SS_DT_STRING,
				CVal:  p.options.StatisticOptions.OtherStr,
			}
			for i := 0; i < len(groupByKeys); i++ {
				if groupByKeys[i] == p.options.StatisticOptions.CountField || groupByKeys[i] == p.options.StatisticOptions.PercentField {
					continue
				}
				bucketKey[i] = p.options.StatisticOptions.OtherStr
			}

			for key := range aggregationResult.Results[0].StatRes {
				if key == p.options.StatisticOptions.CountField || key == p.options.StatisticOptions.PercentField {
					continue
				}
				statRes[key] = otherEnclosure
			}

			otherBucketRes := &structs.BucketResult{
				ElemCount:   otherCnt,
				StatRes:     statRes,
				BucketKey:   bucketKey,
				GroupByKeys: groupByKeys,
			}

			if countIsGroupByCol || percentIsGroupByCol {
				err := p.options.OverrideGroupByCol(otherBucketRes, resTotal)
				if err != nil {
					return nil, fmt.Errorf("statisticExprProcessor.Process: %v", err)
				}
			}

			if p.options.StatisticOptions.ShowCount && !countIsGroupByCol {
				p.options.SetCountToStatRes(statRes, otherCnt)
			}

			if p.options.StatisticOptions.ShowPerc && !percentIsGroupByCol {
				p.options.SetPercToStatRes(statRes, otherCnt, resTotal)
			}

			aggregationResult.Results = append(aggregationResult.Results, otherBucketRes)
		}
	}

	p.finalAggregationResults = aggResults

	return p.getIQRFromAggregationResults()
}

func (p *statisticExprProcessor) getIQRFromAggregationResults() (*iqr.IQR, error) {
	if p.finalAggregationResults == nil {
		return nil, io.EOF
	}

	inputIQR := iqr.NewIQR(p.qid)

	bucketHolderArr, measureFuncs, bucketCount := segresults.CreateMeasResultsFromAggResults(int(sutils.QUERY_MAX_BUCKETS), p.finalAggregationResults)

	err := inputIQR.CreateStatsResults(bucketHolderArr, measureFuncs, p.groupByColumns, bucketCount)
	if err != nil {
		return nil, utils.TeeErrorf("qid=%v, statisticExprProcessor.Process: cannot create stats results; err=%v", inputIQR.GetQID(), err)
	}

	p.hasFinalResult = true

	return inputIQR, io.EOF
}
